// 实现消息转发子服务
#include <brpc/server.h>
#include <butil/logging.h>

#include "etcd.hpp"     // 服务注册模块封装
#include "logger.hpp"   // 日志模块封装
#include "rabbitmq.hpp"
#include "channel.hpp"
#include "utils.hpp"
#include "mysql_chat_session_member.hpp"

#include "base.pb.h"       // protobuf框架代码
#include "user.pb.h"       // protobuf框架代码
#include "transmite.pb.h"  // protobuf框架代码

namespace suye
{
class MsgTransmitServiceImpl : public suye::MsgTransmitService {
public:
    MsgTransmitServiceImpl(const std::string& user_service_name,
        const ServiceManager::ptr& channels,
        const std::shared_ptr<odb::core::database>& mysql_client,
        const std::string& exchange_name,
        const std::string& routing_key,
        const MQClient::ptr& mq_client)
        : _user_service_name(user_service_name)
        , _sm_channels(channels)
        , _mysql_session_member_table(std::make_shared<ChatSessionMemberTable>(mysql_client))
        , _exchange_name(exchange_name)
        , _routing_key(routing_key)
        , _mq_client(mq_client) {}

    ~MsgTransmitServiceImpl() {}
    
    virtual void GetTransmitTarget(google::protobuf::RpcController* controller,
                       const ::suye::NewMessageReq* request,
                       ::suye::GetTransmitTargetRsp* response,
                       ::google::protobuf::Closure* done)
    {
        brpc::ClosureGuard rpc_guard(done);

        auto err_response = [this, response](const std::string& rid, 
            const std::string& errmsg) -> void {
                response->set_request_id(rid);
                response->set_success(false);
                response->set_errmsg(errmsg);
                return;
            };

        // 0.从请求中获取关键信息：用户ID，所属会话ID，消息内容
        std::string rid = request->request_id();
        std::string uid = request->user_id();
        std::string chat_ssid = request->chat_session_id();
        const MessageContent& content = request->message();
        // 1.进行消息组织：发送者——用户子服务获取信息，产生时间，所属会话，消息ID，消息内容
        auto channel = _sm_channels->Choose(_user_service_name);
        if (!channel) {
            LOG_ERROR("{} 没有可供访问的用户子服务节点！", _user_service_name);
            return err_response(rid, "没有可供访问的用户子服务节点！");
        }
        UserService_Stub stub(channel.get());
        GetUserInfoReq req;
        req.set_request_id(rid);
        req.set_user_id(uid);

        GetUserInfoRsp rsp;
        brpc::Controller cntl;
        stub.GetUserInfo(&cntl, &req, &rsp, nullptr);
        if (cntl.Failed() == true || rsp.success() == false) {
            LOG_ERROR("用户子服务调用失败！{}-{}", _user_service_name, cntl.ErrorText());
            return err_response(request->request_id(), "用户子服务调用失败!");
        }
        MessageInfo message;
        message.set_message_id(uuid());
        message.set_chat_session_id(chat_ssid);
        message.set_timestamp(time(nullptr));
        message.mutable_sender()->CopyFrom(rsp.user_info());
        message.mutable_message()->CopyFrom(content);
        // 2.获取消息转发客户端用户列表
        auto target_list = _mysql_session_member_table->members(chat_ssid);
        // 3.封装完毕的消息，发布到消息队列，待消息存储子服务进行消息持久化
        bool res = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);
        if (res == false) {
            LOG_ERROR("持久化消息发布失败！{}", cntl.ErrorText());
            return err_response(request->request_id(), "持久化消息发布失败！");
        }
        // 4.组织响应
        response->set_request_id(rid);
        response->set_success(true);
        response->mutable_message()->CopyFrom(message);
        for (const auto& id: target_list) {
            response->add_target_id_list(id);
        }
    }
    
private:
    //  用户子服务相关信息
    std::string _user_service_name;
    ServiceManager::ptr _sm_channels;
    // 聊天会话成员表的操作句柄
    ChatSessionMemberTable::ptr _mysql_session_member_table;
    // 消息队列客户端句柄
    std::string _exchange_name;
    std::string _routing_key;
    MQClient::ptr _mq_client;
};

class MsgTransmitServer {
public:
    using ptr = std::shared_ptr<MsgTransmitServer>;
    MsgTransmitServer(const Discovery::ptr& service_discoverer,
        const Registry::ptr& reg_client,
        const std::shared_ptr<odb::core::database>& mysql_client, 
        const std::shared_ptr<brpc::Server>& rpc_server) 
        : _service_discoverer(service_discoverer)
        , _reg_client(reg_client)
        , _mysql_client(mysql_client)
        , _rpc_server(rpc_server) {}

    ~MsgTransmitServer() {}
    
    // 搭建RPC服务器，并启动服务器
    void start()
    {
        _rpc_server->RunUntilAskedToQuit();
    }

private:
    Discovery::ptr _service_discoverer; // 服务发现客户端
    Registry::ptr _reg_client;          // 服务注册客户端
    std::shared_ptr<odb::core::database> _mysql_client; // mysql数据库客户端
    std::shared_ptr<brpc::Server> _rpc_server;
};

class MsgTransmitServerBuilder {
public:
    //构造mysql客户端对象
    void make_mysql_object(
        const std::string& user,
        const std::string& pswd,
        const std::string& db,
        const std::string& host,
        const std::string& cset,
        int port,
        int conn_pool_count) 
    {
        _mysql_client = ODBFactory::create(user, pswd, db, host, cset, port, conn_pool_count);
    }

    //用于构造服务发现客户端&信道管理对象
    void make_discovery_object(const std::string& reg_host,
        const std::string& base_service_name,
        const std::string& user_service_name) {
        _user_service_name = user_service_name;
        _sm_channels = std::make_shared<ServiceManager>();
        _sm_channels->Declared(user_service_name);
        LOG_DEBUG("设置用户子服务为需添加管理的子服务：{}", user_service_name);
        auto put_cb = std::bind(&ServiceManager::OnServiceOnline, _sm_channels.get(), std::placeholders::_1, std::placeholders::_2);
        auto del_cb = std::bind(&ServiceManager::OnServiceOffline, _sm_channels.get(), std::placeholders::_1, std::placeholders::_2);
        _service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);
    }

    //用于构造服务注册客户端对象
    void make_registry_object(const std::string& reg_host,
        const std::string& service_name,
        const std::string& access_host) {
        _registry_client = std::make_shared<Registry>(reg_host);
        _registry_client->registry(service_name, access_host);
    }

    //用于构造rabbitmq客户端对象
    void make_mq_object(const std::string& user, 
        const std::string& passwd,
        const std::string& host,
        const std::string& exchange_name,
        const std::string& queue_name,
        const std::string& binding_key)  
    {
        _routing_key = binding_key;
        _exchange_name = exchange_name;
        _mq_client = std::make_shared<MQClient>(user, passwd, host);
        _mq_client->declareComponents(exchange_name, queue_name, binding_key);
    }

    //构造RPC服务器对象
    void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads) 
    {
        if (!_mq_client) {
            LOG_ERROR("还未初始化消息队列客户端模块！");
            abort();
        }
        if (!_sm_channels) {
            LOG_ERROR("还未初始化信道管理模块！");
            abort();
        }
        if (!_mysql_client) {
            LOG_ERROR("还未初始化Mysql数据库模块！");
            abort();
        }

        _rpc_server = std::make_shared<brpc::Server>();

        MsgTransmitServiceImpl *transmite_service = new MsgTransmitServiceImpl(
            _user_service_name, _sm_channels, _mysql_client, _exchange_name, _routing_key, _mq_client);

        int ret = _rpc_server->AddService(transmite_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);
        if (ret == -1) {
            LOG_ERROR("添加Rpc服务失败！");
            abort();
        }
        brpc::ServerOptions options;
        options.idle_timeout_sec = timeout;
        options.num_threads = num_threads;
        ret = _rpc_server->Start(port, &options);
        if (ret == -1) {
            LOG_ERROR("服务启动失败！");
            abort();
        }
    }

    MsgTransmitServer::ptr build() 
    {
        if (!_service_discoverer) {
            LOG_ERROR("还未初始化服务发现模块！");
            abort();
        }
        if (!_registry_client) {
            LOG_ERROR("还未初始化服务注册模块！");
            abort();
        }
        if (!_rpc_server) {
            LOG_ERROR("还未初始化RPC服务器模块！");
            abort();
        }
        MsgTransmitServer::ptr server = std::make_shared<MsgTransmitServer>(
            _service_discoverer, _registry_client, _mysql_client, _rpc_server);
        return server;
    }

private:
    std::string _user_service_name;
    ServiceManager::ptr _sm_channels;
    Discovery::ptr _service_discoverer;
    
    std::string _routing_key;
    std::string _exchange_name;
    MQClient::ptr _mq_client;

    Registry::ptr _registry_client; 
    std::shared_ptr<odb::core::database> _mysql_client; 
    std::shared_ptr<brpc::Server> _rpc_server;
};
}